package org.jetlinks.pro.clickhouse;

import io.netty.buffer.ByteBufAllocator;
import io.netty.util.Recycler;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpStatus;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import reactor.util.retry.Retry;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;

@Slf4j
public class RestfulClickHouseOperations implements ClickHouseOperations {
    static DataBufferFactory bufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);

    private final ClickHouseProperties properties;

    private final WebClient client;

    private Disposable disposable;

    private static final byte LB = '(';
    private static final byte RB = ')';
    private static final byte comma = ',';

    private FluxSink<Buffer> sink;

    public RestfulClickHouseOperations(ClickHouseProperties properties) {
        this.properties = properties;
        this.client = properties.getRestful().create();
        init();
    }

    private void init() {
        disposable = Flux
            .<Buffer>create(sink -> this.sink = sink)
            //按表分组
            .groupBy(Buffer::getTable, Integer.MAX_VALUE)
            .flatMap(group -> {
                String table = group.key();
                return group
                    .map(Buffer::release)
                    .windowTimeout(properties.getMaxBatchSize(), properties.getBatchDuration())
                    .flatMap(window -> this
                        .insert(table, window.cache())
                        .as(mono ->
                                properties.getMaxRetry() > 0
                                    ? mono.retryWhen(Retry.backoff(properties.getMaxRetry(), properties.getRetryDuration()))
                                    : mono)
                        .onErrorResume(err -> {
                            log.error("write clickhouse [{}] data error", table, err);
                            return Mono.empty();
                        }));
            }, Integer.MAX_VALUE)
            .doOnDiscard(Buffer.class, Buffer::release)
            .subscribe();

    }

    public void destroy() {
        disposable.dispose();
    }

    private DataBuffer createDataBuffer(String[] columns, List<Map<String, Object>> list) {
        DataBuffer buffer = bufferFactory.allocateBuffer(list.size() * 256);

        // (1,2,3),(1,2,3)
        for (Map<String, Object> values : list) {
            if (buffer.writePosition() > 0) {
                buffer.write(comma);
            }
            buffer.write(LB);
            for (int i = 0; i < columns.length; i++) {
                if (i != 0) {
                    buffer.write(comma);
                }
                Object value = values.get(columns[i]);
                buffer.write(createSqlValue(value), StandardCharsets.UTF_8);
            }
            buffer.write(RB);
        }
        return buffer;
    }

    private String createInsertSql(String table, String[] columns) {
        StringJoiner joiner = new StringJoiner(",", "INSERT INTO " + table + "(", ") values");
        for (String column : columns) {
            joiner.add(column);
        }
        return joiner.toString();
    }

    @Override
    public Mono<Void> insert(String table, Map<String, Object> value) {
        sink.next(Buffer.of(table, value));
        return Mono.empty();
    }

    static Recycler<Buffer> pool = new Recycler<Buffer>() {
        @Override
        protected Buffer newObject(Handle<Buffer> handle) {
            return new Buffer(handle);
        }
    };

    static class Buffer {
        private final Recycler.Handle<Buffer> handle;

        @Getter
        private String table;
        @Getter
        private Map<String, Object> data;

        private Buffer(Recycler.Handle<Buffer> handle) {
            this.handle = handle;
        }

        public static Buffer of(String table, Map<String, Object> data) {
            Buffer buffer;
            try {
                buffer = pool.get();
            } catch (Throwable e) {
                buffer = new Buffer(null);
            }
            buffer.table = table;
            buffer.data = data;
            return buffer;
        }

        Map<String, Object> release() {
            Map<String, Object> temp = this.data;
            if (handle != null) {
                this.table = null;
                this.data = null;
                handle.recycle(this);
            }
            return temp;
        }

    }

    @Override
    public String getDatabase() {
        String database = properties.getRestful().getDatabase();
        if (StringUtils.isEmpty(database)){
            return properties.getRestful().getUsername();
        }
        return database;

    }

    @Override
    public Mono<Void> insert(String table, Flux<Map<String, Object>> valueStream) {
        return valueStream
            .switchOnFirst((signal, mapFlux) -> {
                if (!signal.isOnNext()) {
                    return Mono.empty();
                }
                Map<String, Object> first = signal.get();
                if (first == null) {
                    return Mono.error(new IllegalArgumentException("first data can not be null"));
                }
                //从第一条数据里取出列
                String[] columns = first.keySet().toArray(new String[0]);
                //insert into table(column1,column2,....) values
                String SQL = createInsertSql(table, columns);
                log.debug("execute clickhouse insert: {}",SQL);
                AtomicLong time = new AtomicLong();
                AtomicLong count = new AtomicLong();
                DataBuffer sqlHead = bufferFactory.wrap(SQL.getBytes());
                return client
                    .post()
//                    .uri(builder -> builder.path("/").queryParam("query", SQL).build())
                    // FIXME: 2021/4/28 应该按数据长度来缓冲?
                    .body(Flux
                              .concat(
                                  //SQL头
                                  Flux.just(sqlHead),
                                  //数据
                                  mapFlux
                                      .buffer(5000)
                                      .map(list -> {
                                          count.addAndGet(list.size());
                                          return createDataBuffer(columns, list);
                                      })
                              )
                              .doOnComplete(() -> time.set(System.currentTimeMillis()))
                        , DataBuffer.class)
                    .retrieve()
                    .onStatus(HttpStatus::isError, ClickHouseException::of)
                    .bodyToMono(Void.class)
                    .doOnSuccess(ignore -> log.trace("保存ClickHouse[{}]数据成功,数量:{},耗时:{}ms",
                                                     table,
                                                     count.get(),
                                                     System.currentTimeMillis() - time.get()));
            })
            .then();
    }

    private String createSqlValue(Object value) {
        if (value == null) {
            return "null";
        }
        if (value instanceof Number) {
            return String.valueOf(value);
        }
        if (value instanceof String) {
            return "'".concat(String.valueOf(value)).concat("'");
        }

        return String.valueOf(value);
    }

    @Override
    public Mono<Void> execute(String sql) {
        log.debug("execute clickhouse: {}", sql);
        return client
            .post()
            .bodyValue(sql)
            .retrieve()
            .onStatus(HttpStatus::isError, ClickHouseException::of)
            .bodyToMono(Void.class)
            .doOnError(err -> log.warn("execute clickhouse error : {} ", sql, err));
    }

    @Override
    public <R> Flux<R> query(String sql, ResultWrapper<R, ?> wrapper) {
        //默认使用JSON格式返回查询结果
        Tuple2<String, Format> queryAndFormat = parseFormat(sql, DefaultFormat.JSON);
        log.debug("execute clickhouse query: {}", queryAndFormat.getT1());
        return queryAndFormat
            .getT2()
            .parse(
                client
                    .post()
//                    .uri(builder -> builder.path("/").queryParam("query", queryAndFormat.getT1()).build())
                    .bodyValue(queryAndFormat.getT1())
                    .retrieve()
                    .onStatus(HttpStatus::isError, ClickHouseException::of)
                    .bodyToFlux(DataBuffer.class), wrapper
            ).doOnError(err -> log.warn("execute clickhouse query error : {} ", queryAndFormat.getT1(), err));
    }

    /**
     * 解析出SQL中的format,如果SQL中没有指定format则使用参数指定的format
     *
     * @param sql    SQL
     * @param format format
     */

    private Tuple2<String, Format> parseFormat(String sql, Format format) {
        String[] arr = sql.split("[ ]");
        if (!arr[arr.length - 2].equalsIgnoreCase("FORMAT")) {
            arr = Arrays.copyOf(arr, arr.length + 2);
            arr[arr.length - 2] = "FORMAT";
        }
        arr[arr.length - 1] = format.getId();
        return Tuples
            .of(
                //SQL
                String.join(" ", arr),
                //FORMAT
                Format
                    .lookup(arr[arr.length - 1])
                    .orElseThrow(() -> new ClickHouseException("unsupported format :" + format.getId()))
            );
    }
}
